package day03.window;

import beans.SensorReading;
import custom.MyCountAggregateFunction;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * Flink 流处理 API - Windows Function(窗口函数)
 *
 * @author lvbingbing
 * @date 2022-01-01 13:05
 */
public class FlinkWindow03 {
    public static void main(String[] args) throws Exception {
        // 1、创建 FlinkWindow00 对象，有参构造会初始化 env，并从socket文本流中读取数据
        int parallelism = 1;
        FlinkWindow00 flinkWindow = new FlinkWindow00(parallelism);
        // 2、获取可执行环境
        StreamExecutionEnvironment env = flinkWindow.getEnv();
        // 3、学习窗口函数
        studyWindowsFunction(flinkWindow.getSingleOutputStreamOperator());
        // 4、触发程序执行
        env.execute();
    }

    /**
     * 学习窗口函数
     * <p>
     * 1、增量聚合函数
     * 2、全窗口函数
     * <p>
     *
     * @param sensorReadingStream <br>
     */
    private static void studyWindowsFunction(SingleOutputStreamOperator<SensorReading> sensorReadingStream) {
        // 1、增量聚合函数
        studyIncrementalAggregationFunctions(sensorReadingStream);
        // 2、全窗口函数
        studyFullWindowFunctions(sensorReadingStream);
    }


    /**
     * 增量聚合函数
     * <p>
     * 每条数据到来就进行计算，保持一个简单的状态。
     * <p>
     * 典型的增量聚合函数有：ReduceFunction，AggregateFunction。
     *
     * @param sensorReadingStream <br>
     */
    private static void studyIncrementalAggregationFunctions(SingleOutputStreamOperator<SensorReading> sensorReadingStream) {
        SingleOutputStreamOperator<Integer> tumblingTimeWindowsDs = sensorReadingStream.keyBy("id")
                .timeWindow(Time.seconds(15))
                .aggregate(new MyCountAggregateFunction());
        tumblingTimeWindowsDs.print("增量聚合函数");
    }

    /**
     * 全窗口函数
     * <p>
     * 先把窗口所有数据收集起来，等到计算的时候会遍历所有数据。
     * <p>
     * 全窗口函数有：ProcessWindowFunction、WindowFunction
     *
     * @param sensorReadingStream <br>
     */
    private static void studyFullWindowFunctions(SingleOutputStreamOperator<SensorReading> sensorReadingStream) {
        SingleOutputStreamOperator<Tuple3<String, Long, Integer>> processWindowFunctionDs = sensorReadingStream.keyBy("id")
                .timeWindow(Time.seconds(15))
                .process(new MyProcessWindowFunction());
        processWindowFunctionDs.print("全窗口函数-processWindowFunction");

        SingleOutputStreamOperator<Tuple3<String, Long, Integer>> applyWindowFunctionDs = sensorReadingStream.keyBy("id")
                .timeWindow(Time.seconds(15))
                .apply(new MyWindowFunction());
        applyWindowFunctionDs.print("全窗口函数-WindowFunction");
    }

    public static class MyProcessWindowFunction extends ProcessWindowFunction<SensorReading, Tuple3<String, Long, Integer>, Tuple, TimeWindow> {
        @Override
        public void process(Tuple key, Context context, Iterable<SensorReading> elements, Collector<Tuple3<String, Long, Integer>> out) throws Exception {
            String id = key.getField(0);
            TimeWindow window = context.window();
            long end = window.getEnd();
            int count = IteratorUtils.toList(elements.iterator()).size();
            out.collect(new Tuple3<>(id, end, count));
        }
    }

    public static class MyWindowFunction implements WindowFunction<SensorReading, Tuple3<String, Long, Integer>, Tuple, TimeWindow> {
        @Override
        public void apply(Tuple key, TimeWindow window, Iterable<SensorReading> input, Collector<Tuple3<String, Long, Integer>> out) throws Exception {
            String id = key.getField(0);
            long windowEnd = window.getEnd();
            int count = IteratorUtils.toList(input.iterator()).size();
            out.collect(new Tuple3<>(id, windowEnd, count));
        }
    }
}
